---
title: DASE Components Explained (Product Ranking)
---

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<%= partial 'shared/dase/dase', locals: { template_name: 'Product Ranking Template' } %>

## The Engine Design

As you can see from the Quick Start, *MyProductRanking* takes a JSON prediction
query, e.g.  `{ "user": "u2", "items": ["i1", "i3", "i10", "i2", "i5", "i31", "i9"] }`, and return a JSON predicted result.
In MyProductRanking/src/main/scala/***Engine.scala***, the `Query` case class
defines the format of such **query**:

```scala
case class Query(
  user: String,
  items: List[String]
) extends Serializable
```

The `PredictedResult` case class defines the format of **predicted result**,
such as

```json
{"itemScores":[
  {"item":"i5","score":1.0038217983580324},
  {"item":"i3","score":0.00598658734782459},
  {"item":"i2","score":0.004048103059012265},
  {"item":"i9","score":-1.966935819737517E-4},
  {"item":"i1","score":-0.0016841195307744916},
  {"item":"i31","score":-0.0019770986240634503},
  {"item":"i10","score":-0.0031498317618844918}],
  "isOriginal":false}
```

with:

```scala
case class PredictedResult(
  itemScores: Array[ItemScore],
  isOriginal: Boolean // set to true if the items are not ranked at all.
) extends Serializable

case class ItemScore(
  item: String,
  score: Double
) extends Serializable
```

Finally, `ProductRankingEngine` is the *Engine Factory* that defines the
components this engine will use: Data Source, Data Preparator, Algorithm(s) and
Serving components.

```scala
object ProductRankingEngine extends IEngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map("als" -> classOf[ALSAlgorithm]),
      classOf[Serving])
  }
}
```

### Spark MLlib

The PredictionIO Product Ranking Engine Template integrates Spark's MLlib ALS algorithm under the DASE
architecture. We will take a closer look at the DASE code below.

The MLlib ALS algorithm takes training data of RDD type, i.e. `RDD[Rating]` and train a model, which is a `MatrixFactorizationModel` object.

You can visit [here](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) to learn more about MLlib's ALS collaborative filtering algorithm.


## Data

In the DASE architecture, data is prepared by 2 components sequentially: *DataSource* and *DataPreparator*. They take data
from the data store and prepare them for Algorithm.

### Data Source

In MyProductRanking/src/main/scala/***DataSource.scala***, the `readTraining`
method of class `DataSource` reads and selects data from the *Event Store*
(data store of the *Event Server*). It returns `TrainingData`.

```scala
case class DataSourceParams(appName: String) extends Params

class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData,
      EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    // create a RDD of (entityID, User)
    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(...) ...

    // create a RDD of (entityID, Item)
    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(...) ...

    // get all "user" "view" "item" events
    val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(...) ...

    new TrainingData(
      users = usersRDD,
      items = itemsRDD,
      viewEvents = viewEventsRDD
    )
  }
}
```

PredictionIO automatically loads the parameters of *datasource* specified in MyProductRanking/***engine.json***, including *appName*, to `dsp`.

In ***engine.json***:

```
{
  ...
  "datasource": {
    "params" : {
      "appName": "MyApp1"
    }
  },
  ...
}
```

In `readTraining()`, `PEventStore` is an object which provides function to access data that is collected by PredictionIO Event Server.

This Product Ranking Engine Template requires "user" and "item" entities that are set by events.

`PEventStore.aggregateProperties(...)` aggregates properties of the `user` and `item` that are set, unset, or delete by special events **$set**, **$unset** and **$delete**. Please refer to [Event API](/datacollection/eventapi/#note-about-properties) for more details of using these events.

The following code aggregates the properties of `user` and then map each result to a `User()` object.

```scala

    // create a RDD of (entityID, User)
    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
      appName = dsp.appName,
      entityType = "user"
    )(sc).map { case (entityId, properties) =>
      val user = try {
        // placeholder for expanding user properties
        User()
      } catch {
        case e: Exception => {
          logger.error(s"Failed to get properties ${properties} of" +
            s" user ${entityId}. Exception: ${e}.")
          throw e
        }
      }
      (entityId, user)
    }.cache()

```
In the template, `User()` object is a simple dummy as a placeholder for you to customize and expand.


Similarly, the following code aggregates the properties of `item` and then map each result to a `Item()` object.

```scala
    // create a RDD of (entityID, Item)
    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
      appName = dsp.appName,
      entityType = "item"
    )(sc).map { case (entityId, properties) =>
      val item = try {
        // placeholder for expanding item properties
        Item()
      } catch {
        case e: Exception => {
          logger.error(s"Failed to get properties ${properties} of" +
            s" item ${entityId}. Exception: ${e}.")
          throw e
        }
      }
      (entityId, item)
    }.cache()
```

In the template, `Item()` object is a simple dummy as a placeholder for you to customize and expand.

`PEventStore.find(...)` specifies the events that you want to read. In this case, "user view item" events are read and then each is mapped to a `ViewEvent()` object.

```scala

    // get all "user" "view" "item" events
    val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("view")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("item")))(sc)
      // PEventStore.find() returns RDD[Event]
      .map { event =>
        val viewEvent = try {
          event.event match {
            case "view" => ViewEvent(
              user = event.entityId,
              item = event.targetEntityId.get,
              t = event.eventTime.getMillis)
            case _ => throw new Exception(s"Unexpected event ${event} is read.")
          }
        } catch {
          case e: Exception => {
            logger.error(s"Cannot convert ${event} to ViewEvent." +
              s" Exception: ${e}.")
            throw e
          }
        }
        viewEvent
      }.cache()

```

`ViewEvent` case class is defined as:

```scala
case class ViewEvent(user: String, item: String, t: Long)
```

INFO: For flexibility, this template is designed to support user ID and item ID in String.

`TrainingData` contains an RDD of `User`, `Item` and `ViewEvent` objects. The class definition of `TrainingData` is:

```scala
class TrainingData(
  val users: RDD[(String, User)],
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent]
) extends Serializable { ... }
```

PredictionIO then passes the returned `TrainingData` object to *Data Preparator*.

### Data Preparator

In MyProductRanking/src/main/scala/***Preparator.scala***, the `prepare` method
of class `Preparator` takes `TrainingData` as its input and performs any
necessary feature selection and data processing tasks. At the end, it returns
`PreparedData` which should contain the data *Algorithm* needs.

By default, `prepare` simply copies the unprocessed `TrainingData` data to `PreparedData`:

```scala
class Preparator
  extends PPreparator[TrainingData, PreparedData] {

  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
    new PreparedData(
      users = trainingData.users,
      items = trainingData.items,
      viewEvents = trainingData.viewEvents)
  }
}

class PreparedData(
  val users: RDD[(String, User)],
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent]
) extends Serializable
```

PredictionIO passes the returned `PreparedData` object to Algorithm's `train` function.

## Algorithm

In MyProductRanking/src/main/scala/***ALSAlgorithm.scala***, the two methods of
the algorithm class are `train` and `predict`. `train` is responsible for
training the predictive model;`predict` is
responsible for using this model to make prediction.

### train(...)

`train` is called when you run **pio train**. This is where MLlib ALS algorithm,
i.e. `ALS.trainImplicit()`, is used to train a predictive model.


```scala
  def train(sc: SparkContext, data: PreparedData): ALSModel = {

    ...

    // create User and item's String ID to integer index BiMap
    val userStringIntMap = BiMap.stringInt(data.users.keys)
    val itemStringIntMap = BiMap.stringInt(data.items.keys)

    val mllibRatings = data.viewEvents
      .map { r =>
        // Convert user and item String IDs to Int index for MLlib
        val uindex = userStringIntMap.getOrElse(r.user, -1)
        val iindex = itemStringIntMap.getOrElse(r.item, -1)

        if (uindex == -1)
          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
            + " to Int index.")

        if (iindex == -1)
          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
            + " to Int index.")

        ((uindex, iindex), 1)
      }.filter { case ((u, i), v) =>
        // keep events with valid user and item index
        (u != -1) && (i != -1)
      }.reduceByKey(_ + _) // aggregate all view events of same user-item pair
      .map { case ((u, i), v) =>
        // MLlibRating requires integer index for user and item
        MLlibRating(u, i, v)
      }

    // MLLib ALS cannot handle empty training data.
    require(!mllibRatings.take(1).isEmpty,
      s"mllibRatings cannot be empty." +
      " Please check if your events contain valid user and item ID.")

    // seed for MLlib ALS
    val seed = ap.seed.getOrElse(System.nanoTime)

    val m = ALS.trainImplicit(
      ratings = mllibRatings,
      rank = ap.rank,
      iterations = ap.numIterations,
      lambda = ap.lambda,
      blocks = -1,
      alpha = 1.0,
      seed = seed)

    new ALSModel(
      rank = m.rank,
      userFeatures = m.userFeatures.collectAsMap.toMap,
      productFeatures = m.productFeatures.collectAsMap.toMap,
      userStringIntMap = userStringIntMap,
      itemStringIntMap = itemStringIntMap
    )
  }
```

#### Working with Spark MLlib's ALS.trainImplicit(....)

MLlib ALS does not support `String` user ID and item ID. `ALS.trainImplicit` thus also assumes int-only `Rating` object. First, you can rename MLlib's Integer-only `Rating` to `MLlibRating` for clarity:

```
import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
```

In order to use MLlib's ALS algorithm, we need to convert the `viewEvents` into `MLlibRating`. There are two things we need to handle:

1. Map user and item String ID of the ViewEvent into Integer ID, as required by `MLlibRating`.
2. `ViewEvent` object is an implicit event that does not have an explicit rating value. `ALS.trainImplicit()` supports implicit preference. If the `MLlibRating` has higher rating value, it means higher confidence that the user prefers the item. Hence we can aggregate how many times the user has viewed the item to indicate the confidence level that the user may prefer the item.

You create a bi-directional map with `BiMap.stringInt` which maps each String record to an Integer index.

```scala
val userStringIntMap = BiMap.stringInt(data.users.keys)
val itemStringIntMap = BiMap.stringInt(data.items.keys)
```

Then convert the user and item String ID in each ViewEvent to Int with these BiMaps. We use default -1 if the user or item String ID couldn't be found in the BiMap and filter out these events with invalid user and item ID later. After filtering, we use `reduceByKey()` to add up all values for the same key (uindex, iindex) and then finally map to `MLlibRating` object.

```scala

val mllibRatings = data.viewEvents
  .map { r =>
    // Convert user and item String IDs to Int index for MLlib
    val uindex = userStringIntMap.getOrElse(r.user, -1)
    val iindex = itemStringIntMap.getOrElse(r.item, -1)

    if (uindex == -1)
      logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
        + " to Int index.")

    if (iindex == -1)
      logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
        + " to Int index.")

    ((uindex, iindex), 1)
  }.filter { case ((u, i), v) =>
    // keep events with valid user and item index
    (u != -1) && (i != -1)
  }.reduceByKey(_ + _) // aggregate all view events of same user-item pair
  .map { case ((u, i), v) =>
    // MLlibRating requires integer index for user and item
    MLlibRating(u, i, v)
  }

```

In addition to `RDD[MLlibRating]`, `ALS.trainImplicit` takes the following parameters: *rank*, *iterations*, *lambda* and *seed*.

The values of these parameters are specified in *algorithms* of
MyProductRanking/***engine.json***:

```
{
  ...
  "algorithms": [
    {
      "name": "als",
      "params": {
        "rank": 10,
        "numIterations": 20,
        "lambda": 0.01,
        "seed": 3
      }
    }
  ]
  ...
}
```

PredictionIO will automatically loads these values into the constructor `ap`,
which has a corresponding case class `ALSAlgorithmParams`:

```scala
case class ALSAlgorithmParams(
  rank: Int,
  numIterations: Int,
  lambda: Double,
  seed: Option[Long]) extends Params
```

The `seed` parameter is an optional parameter, which is used by MLlib ALS algorithm internally to generate random values. If the `seed` is not specified, current system time would be used and hence each train may produce different results. Specify a fixed value for the `seed` if you want to have deterministic result (For example, when you are testing).

`ALS.trainImplicit()` then returns a `MatrixFactorizationModel` model which contains two RDDs: userFeatures and productFeatures. They correspond to the user X latent features matrix and item X latent features matrix, respectively. In this case, we will make use of both userFeatures and productFeatures matrix to rank the items for the user. These matrixes are stored as local model. You could see the `ALSModel` class is defined as:

```scala
class ALSModel(
  val rank: Int,
  val userFeatures: Map[Int, Array[Double]],
  val productFeatures: Map[Int, Array[Double]],
  val userStringIntMap: BiMap[String, Int],
  val itemStringIntMap: BiMap[String, Int]
) extends Serializable { ... }
```

PredictionIO will automatically store the returned model, i.e. `ALSModel` in this example.

### predict(...)

`predict` is called when you send a JSON query to
http://localhost:8000/queries.json. PredictionIO converts the query, such as `{ "user": "u2", "items": ["i1", "i3", "i10", "i2", "i5", "i31", "i9"] }` to the `Query` class you defined previously.

To rank the calculated the ranked scores of the items, we first look up the feature vector of this user (if the user exists). Then we look up the feature vectors of the items in query (if the items exist). The score is the dot product of the user and item feature vectors. The items are then sorted by the score.

```scala

  def predict(model: ALSModel, query: Query): PredictedResult = {

    val itemStringIntMap = model.itemStringIntMap
    val productFeatures = model.productFeatures

    // default itemScores array if items are not ranked at all
    lazy val notRankedItemScores =
      query.items.map(i => ItemScore(i, 0)).toArray

    model.userStringIntMap.get(query.user).map { userIndex =>
      // lookup userFeature for the user
      model.userFeatures.get(userIndex)
    }.flatten // flatten Option[Option[Array[Double]]] to Option[Array[Double]]
    .map { userFeature =>
      val scores: Vector[Option[Double]] = query.items.toVector
        .par // convert to parallel collection for parallel lookup
        .map { iid =>
          // convert query item id to index
          val featureOpt: Option[Array[Double]] = itemStringIntMap.get(iid)
            // productFeatures may not contain the item
            .map (index => productFeatures.get(index))
            // flatten Option[Option[Array[Double]]] to Option[Array[Double]]
            .flatten

          featureOpt.map(f => dotProduct(f, userFeature))
        }.seq // convert back to sequential collection

      // check if all scores is None (get rid of all None and see if empty)
      val isAllNone = scores.flatten.isEmpty

      if (isAllNone) {
        logger.info(s"No productFeature for all items ${query.items}.")
        PredictedResult(
          itemScores = notRankedItemScores,
          isOriginal = true
        )
      } else {
        // sort the score
        val ord = Ordering.by[ItemScore, Double](_.score).reverse
        val sorted = query.items.zip(scores).map{ case (iid, scoreOpt) =>
          ItemScore(
            item = iid,
            score = scoreOpt.getOrElse[Double](0)
          )
        }.sorted(ord).toArray

        PredictedResult(
          itemScores = sorted,
          isOriginal = false
        )
      }
    }.getOrElse {
      logger.info(s"No userFeature found for user ${query.user}.")
      PredictedResult(
        itemScores = notRankedItemScores,
        isOriginal = true
      )
    }

  }

```

PredictionIO passes the returned `PredictedResult` object to *Serving*.

## Serving

The `serve` method of class `Serving` processes predicted result. It is also
responsible for combining multiple predicted results into one if you have more
than one predictive model. *Serving* then returns the final predicted result.
PredictionIO will convert it to a JSON response automatically.

In MyProductRanking/src/main/scala/***Serving.scala***,

```scala
class Serving
  extends LServing[Query, PredictedResult] {

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {
    predictedResults.head
  }
}
```

When you send a JSON query to http://localhost:8000/queries.json,
`PredictedResult` from all models will be passed to `serve` as a sequence, i.e.
`Seq[PredictedResult]`.

> An engine can train multiple models if you specify more than one Algorithm
component in `object RecommendationEngine` inside ***Engine.scala***. Since only
one `ALSAlgorithm` is implemented by default, this `Seq` contains one element.
